package com.atguigu.flink.sql.connector;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**



 */
public class Demo7_WriteUpsertKafka
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //源表
        Table table = tableEnv.fromDataStream(ds);
        tableEnv.createTemporaryView("t2",table);

        //存储每一种传感器的累积vc
        String createTableSql = "CREATE TABLE t1 (" +
            "  id STRING PRIMARY KEY NOT ENFORCED ," +
            "  vc INT " +
            ")  WITH (" +
            " 'connector' = 'upsert-kafka'," +
            "  'topic' = 't5'," +
            "  'properties.bootstrap.servers' = 'hadoop102:9092'," +
            "  'key.format' = 'json' ," +
            "  'value.format' = 'json' " +
            ")";

        tableEnv.executeSql(createTableSql);

        /*
            聚合操作
                对表进行聚合操作，此时需要随着数据的不断增加，去修改之前已经聚合的结果。
                    kafka是一个日志消息系统，不支持随机修改，只能append。
                    使用普通的kafka连接器，无法完成操作。普通的kafka连接器只能执行append操作，只能写Insert类型的数据，保证流或表中没有删除，修改操作。
         */
       tableEnv.executeSql("insert into t1 select id,sum(vc) sumVc from t2 group by id");

    }
}
